[SPARK-22355][SQL] Dataset.collect is not threadsafe#19577
[SPARK-22355][SQL] Dataset.collect is not threadsafe#19577cloud-fan wants to merge 2 commits intoapache:masterfrom
Conversation
|
Test build #83063 has finished for PR 19577 at commit
|
| */ | ||
| private def collectFromPlan(plan: SparkPlan): Array[T] = { | ||
| plan.executeCollect().map(boundEnc.fromRow) | ||
| val objProj = GenerateSafeProjection.generate(deserializer :: Nil) |
There was a problem hiding this comment.
fromRow has caught Exception during decoding. Shall we also catch it?
There was a problem hiding this comment.
it just rethrow the exception, not a big deal
| def toLocalIterator(): java.util.Iterator[T] = { | ||
| withAction("toLocalIterator", queryExecution) { plan => | ||
| plan.executeToIterator().map(boundEnc.fromRow).asJava | ||
| val objProj = GenerateSafeProjection.generate(deserializer :: Nil) |
There was a problem hiding this comment.
It should be better to explain we keep the projection inside for thread-safe with a comment.
|
Nice catch! LGTM with two minor comments. |
|
Good catch, LGTM. |
|
Test build #83091 has finished for PR 19577 at commit
|
It's possible that users create a `Dataset`, and call `collect` of this `Dataset` in many threads at the same time. Currently `Dataset#collect` just call `encoder.fromRow` to convert spark rows to objects of type T, and this encoder is per-dataset. This means `Dataset#collect` is not thread-safe, because the encoder uses a projection to output the object to a re-usable row. This PR fixes this problem, by creating a new projection when calling `Dataset#collect`, so that we have the re-usable row for each method call, instead of each Dataset. N/A Author: Wenchen Fan <wenchen@databricks.com> Closes #19577 from cloud-fan/encoder. (cherry picked from commit 5c3a1f3) Signed-off-by: gatorsmile <gatorsmile@gmail.com>
|
Thanks! Merged to master/2.2. cc @zsxwing |
It's possible that users create a `Dataset`, and call `collect` of this `Dataset` in many threads at the same time. Currently `Dataset#collect` just call `encoder.fromRow` to convert spark rows to objects of type T, and this encoder is per-dataset. This means `Dataset#collect` is not thread-safe, because the encoder uses a projection to output the object to a re-usable row. This PR fixes this problem, by creating a new projection when calling `Dataset#collect`, so that we have the re-usable row for each method call, instead of each Dataset. N/A Author: Wenchen Fan <wenchen@databricks.com> Closes apache#19577 from cloud-fan/encoder. (cherry picked from commit 5c3a1f3) Signed-off-by: gatorsmile <gatorsmile@gmail.com>
### What changes were proposed in this pull request?
This PR fixes a thread-safety bug in `SparkSession.createDataset(Seq)`: if the caller-supplied `Encoder` is used in multiple threads then createDataset's usage of the encoder may lead to incorrect / corrupt results because the Encoder's internal mutable state will be updated from multiple threads.
Here is an example demonstrating the problem:
```scala
import org.apache.spark.sql._
val enc = implicitly[Encoder[(Int, Int)]]
val datasets = (1 to 100).par.map { _ =>
val pairs = (1 to 100).map(x => (x, x))
spark.createDataset(pairs)(enc)
}
datasets.reduce(_ union _).collect().foreach {
pair => require(pair._1 == pair._2, s"Pair elements are mismatched: $pair")
}
```
Before this PR's change, the above example fails because Spark produces corrupted records where different input records' fields have been co-mingled.
This bug is similar to SPARK-22355 / #19577, a similar problem in `Dataset.collect()`.
The fix implemented here is based on #24735's updated version of the `Datataset.collect()` bugfix: use `.copy()`. For consistency, I used same [code comment](https://github.com/apache/spark/blob/d841b33ba3a9b0504597dbccd4b0d11fa810abf3/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L3414) / explanation as that PR.
### Does this PR introduce any user-facing change?
No.
### How was this patch tested?
Tested manually using the example listed above.
Thanks to smcnamara-stripe for identifying this bug.
Closes #26076 from JoshRosen/SPARK-29419.
Authored-by: Josh Rosen <rosenville@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
(cherry picked from commit f4499f6)
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
This PR fixes a thread-safety bug in `SparkSession.createDataset(Seq)`: if the caller-supplied `Encoder` is used in multiple threads then createDataset's usage of the encoder may lead to incorrect / corrupt results because the Encoder's internal mutable state will be updated from multiple threads.
Here is an example demonstrating the problem:
```scala
import org.apache.spark.sql._
val enc = implicitly[Encoder[(Int, Int)]]
val datasets = (1 to 100).par.map { _ =>
val pairs = (1 to 100).map(x => (x, x))
spark.createDataset(pairs)(enc)
}
datasets.reduce(_ union _).collect().foreach {
pair => require(pair._1 == pair._2, s"Pair elements are mismatched: $pair")
}
```
Before this PR's change, the above example fails because Spark produces corrupted records where different input records' fields have been co-mingled.
This bug is similar to SPARK-22355 / #19577, a similar problem in `Dataset.collect()`.
The fix implemented here is based on #24735's updated version of the `Datataset.collect()` bugfix: use `.copy()`. For consistency, I used same [code comment](https://github.com/apache/spark/blob/d841b33ba3a9b0504597dbccd4b0d11fa810abf3/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L3414) / explanation as that PR.
### Does this PR introduce any user-facing change?
No.
### How was this patch tested?
Tested manually using the example listed above.
Thanks to smcnamara-stripe for identifying this bug.
Closes #26076 from JoshRosen/SPARK-29419.
Authored-by: Josh Rosen <rosenville@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
This PR fixes a thread-safety bug in `SparkSession.createDataset(Seq)`: if the caller-supplied `Encoder` is used in multiple threads then createDataset's usage of the encoder may lead to incorrect / corrupt results because the Encoder's internal mutable state will be updated from multiple threads.
Here is an example demonstrating the problem:
```scala
import org.apache.spark.sql._
val enc = implicitly[Encoder[(Int, Int)]]
val datasets = (1 to 100).par.map { _ =>
val pairs = (1 to 100).map(x => (x, x))
spark.createDataset(pairs)(enc)
}
datasets.reduce(_ union _).collect().foreach {
pair => require(pair._1 == pair._2, s"Pair elements are mismatched: $pair")
}
```
Before this PR's change, the above example fails because Spark produces corrupted records where different input records' fields have been co-mingled.
This bug is similar to SPARK-22355 / #19577, a similar problem in `Dataset.collect()`.
The fix implemented here is based on #24735's updated version of the `Datataset.collect()` bugfix: use `.copy()`. For consistency, I used same [code comment](https://github.com/apache/spark/blob/d841b33ba3a9b0504597dbccd4b0d11fa810abf3/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L3414) / explanation as that PR.
### Does this PR introduce any user-facing change?
No.
### How was this patch tested?
Tested manually using the example listed above.
Thanks to smcnamara-stripe for identifying this bug.
Closes #26076 from JoshRosen/SPARK-29419.
Authored-by: Josh Rosen <rosenville@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
(cherry picked from commit f4499f6)
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
This PR fixes a thread-safety bug in `SparkSession.createDataset(Seq)`: if the caller-supplied `Encoder` is used in multiple threads then createDataset's usage of the encoder may lead to incorrect / corrupt results because the Encoder's internal mutable state will be updated from multiple threads.
Here is an example demonstrating the problem:
```scala
import org.apache.spark.sql._
val enc = implicitly[Encoder[(Int, Int)]]
val datasets = (1 to 100).par.map { _ =>
val pairs = (1 to 100).map(x => (x, x))
spark.createDataset(pairs)(enc)
}
datasets.reduce(_ union _).collect().foreach {
pair => require(pair._1 == pair._2, s"Pair elements are mismatched: $pair")
}
```
Before this PR's change, the above example fails because Spark produces corrupted records where different input records' fields have been co-mingled.
This bug is similar to SPARK-22355 / apache#19577, a similar problem in `Dataset.collect()`.
The fix implemented here is based on apache#24735's updated version of the `Datataset.collect()` bugfix: use `.copy()`. For consistency, I used same [code comment](https://github.com/apache/spark/blob/d841b33ba3a9b0504597dbccd4b0d11fa810abf3/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L3414) / explanation as that PR.
### Does this PR introduce any user-facing change?
No.
### How was this patch tested?
Tested manually using the example listed above.
Thanks to smcnamara-stripe for identifying this bug.
Closes apache#26076 from JoshRosen/SPARK-29419.
Authored-by: Josh Rosen <rosenville@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
What changes were proposed in this pull request?
It's possible that users create a
Dataset, and callcollectof thisDatasetin many threads at the same time. CurrentlyDataset#collectjust callencoder.fromRowto convert spark rows to objects of type T, and this encoder is per-dataset. This meansDataset#collectis not thread-safe, because the encoder uses a projection to output the object to a re-usable row.This PR fixes this problem, by creating a new projection when calling
Dataset#collect, so that we have the re-usable row for each method call, instead of each Dataset.How was this patch tested?
N/A